package com.atuguigu.flink.sensor;

import netscape.security.UserTarget;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FilterExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        //TODO 1 匿名类
        stream.filter(
                new FilterFunction<SendsorReading>() {
                    @Override
                    public boolean filter(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id.equals("sensor_1");
                    }
                }
        ).print();


        //TODO 2 更好的方法
        stream.filter(
                new MyFilter()
        ).print();


        
        //TODO 3 flatmap实现
        stream.flatMap(
                new FlatMapFunction<SendsorReading, SendsorReading>() {
                    @Override
                    public void flatMap(SendsorReading sendsorReading, Collector<SendsorReading> collector) throws Exception {

                    }
                }
        ).print();
        env.execute();


    }


    public static class MyFilter implements FilterFunction<SendsorReading>{


        @Override
        public boolean filter(SendsorReading sendsorReading) throws Exception {
            return sendsorReading.id.equals("sensor_1");
        }
    }
}
